package cn.doitedu.rtdw.data_etl;

import beans.EventBean;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;

import java.util.Map;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2023/02/05
 * @Tips: 学大数据，到多易教育
 * @Desc:
 *   访问时长分析 轻度聚合 计算任务
 **/
public class E03_EtlJob_TrafficAccTimeLongAgg {
    public static void main(String[] args) throws Exception {
        // 构建编程环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/ckpt");

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


        // 读 kafka中的日志明细数据
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("doitedu:9092")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setGroupId("gpac01"+System.currentTimeMillis())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setTopics("mall-events-wide")
                .build();
        DataStreamSource<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "s");

        // 将从kafka读取的json日志，转成java bean
        SingleOutputStreamOperator<EventBean> beans = ds.map(s -> JSON.parseObject(s, EventBean.class));

        // 将数据按照相同用户相同会话进行分组
        KeyedStream<EventBean, Tuple2<Integer, String>> keyedStream = beans.keyBy(new KeySelector<EventBean, Tuple2<Integer, String>>() {
            @Override
            public Tuple2<Integer, String> getKey(EventBean bean) throws Exception {
                return Tuple2.of(bean.getUser_id(), bean.getSession_id());
            }
        });

        // 为每行数据生成  页面起始时间 字段
        // 遇到 page load事件则提前插入一行虚拟事件
        SingleOutputStreamOperator<EventBean> resultBeanStream = keyedStream.process(new KeyedProcessFunction<Tuple2<Integer, String>, EventBean, EventBean>() {

            ValueState<EventBean> valueState;
            ValueState<Long> timerState;

            @Override
            public void open(Configuration parameters) throws Exception {

                /**
                 * 这里有一个细节问题
                 * 用户的状态bean如果不清除，则会一直占用flink集群中的状态存储空间
                 * 如果用固定的TTL来清除数据，则过于机械，有可能一个用户的会话还没结束，这个状态bean就被删除了，后续的事件就拿不到页面起始时间了
                 * 我们的解决办法：
                 *   1. 如果遇到appclose事件，则主动清除该用户的状态
                 *   2. 如果没有appclose事件，但在2小时内，这个用户的这个会话中，没有任何事件到达，则有可能是他崩溃了，则也清除掉他的状态bean
                 */
                valueState = getRuntimeContext().getState(new ValueStateDescriptor<EventBean>("bean", EventBean.class));

                // 创建一个状态空间，用来存储定时器时间
                timerState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer", Long.class));

            }

            @Override
            public void processElement(EventBean logBean, KeyedProcessFunction<Tuple2<Integer, String>, EventBean, EventBean>.Context ctx, Collector<EventBean> out) throws Exception {

                // 删除之前的定时器
                if(timerState.value() != null) {
                    ctx.timerService().deleteProcessingTimeTimer(timerState.value());
                }

                // 然后注册一个新的定时器
                long nextTimerTime = ctx.timerService().currentProcessingTime() + 60*60*1000L;
                ctx.timerService().registerProcessingTimeTimer(nextTimerTime);
                // 并将此次注册的定时器时间，存入状态
                timerState.update(nextTimerTime);

                logBean.setPage_url(logBean.getProperties().get("url"));
                long currentTime = logBean.getEvent_time();

                // 如果此刻状态中不存在bean，那就是一个新会话的开始，
                // 则添加上扩展字段后将数据更新到状态中
                EventBean stateBean = valueState.value();

                if (stateBean == null || !stateBean.getSession_id().equals(logBean.getSession_id())) {
                    // 扩展各类字段
                    logBean.setPage_end_time(logBean.getEvent_time());
                    logBean.setPage_start_time(logBean.getEvent_time());
                    logBean.setSession_cut_id(logBean.getSession_id() + "-" + logBean.getPage_start_time());
                    // 更新到状态中
                    valueState.update(logBean);
                    // 对stateBean重新赋值
                    stateBean = valueState.value();

                }
                // 如果是pageload事件，则要
                else if ("page_load".equals(logBean.getEvent_id())) {
                    //   1. 更新掉状态bean中的 事件时间和页面结束时间，马上输出更新的状态 bean
                    stateBean.setEvent_time(currentTime);
                    stateBean.setPage_end_time(currentTime);
                    out.collect(stateBean);  //  这里输出的是用于封闭上一个页面的虚拟事件

                    //   2. 再更新页面url和起始时间
                    stateBean.setPage_start_time(currentTime);
                    stateBean.setPage_url(logBean.getPage_url());

                }
                // 如果是wakeup事件
                else if ("wake_up".equals(logBean.getEvent_id())) {
                    // 要更新掉状态bean中的 事件发生时间  及页面起始时间和结束时间  及切割会话id
                    stateBean.setEvent_time(currentTime);
                    stateBean.setPage_start_time(currentTime);
                    stateBean.setPage_end_time(currentTime);
                    stateBean.setSession_cut_id(logBean.getSession_id() + "-" + currentTime);
                }
                // 其他事件
                else {
                    // 则只更新 end_time 时间
                    stateBean.setEvent_time(currentTime);
                    stateBean.setPage_end_time(currentTime);

                }

                // 输出状态中的bean
                out.collect(stateBean);

                // 如果是appclose事件，则还要额外做一件事：清除state数据
                if ("app_close".equals(logBean.getEvent_id())) {
                    valueState.clear();
                }
            }

            @Override
            public void onTimer(long timestamp, KeyedProcessFunction<Tuple2<Integer, String>, EventBean, EventBean>.OnTimerContext ctx, Collector<EventBean> out) throws Exception {
                // 清除状态
                valueState.clear();
                timerState.clear();
                System.out.println("定时器被触发了，此时状态已经被清空了");
            }
        });


        // 将 结果流，转成  逻辑表
        tableEnv.createTemporaryView("tmp_view",resultBeanStream);
        //tableEnv.executeSql("select * from tmp_view").print();

        // 对 doris的目标表创建  逻辑映射表
        tableEnv.executeSql(
                " CREATE TABLE tfc_tl_dorissink(         "
                        +"     dt  DATE                                  "
                        +"     ,user_id            INT                   "
                        +"     ,device_type        VARCHAR(20)           "
                        +"     ,release_channel    VARCHAR(20)           "
                        +"     ,session_id         VARCHAR(20)           "
                        +"     ,session_cut_id     VARCHAR(20)           "
                        +"     ,page_type          VARCHAR(20)           "
                        +"     ,page_url           VARCHAR(100)          "
                        +"     ,page_start_time    BIGINT                "
                        +"     ,page_end_time      BIGINT                "
                        +" ) WITH (                                      "
                        +"    'connector' = 'doris',                     "
                        +"    'fenodes' = 'doitedu:8030',                "
                        +"    'table.identifier' = 'dws.mall_tfc_acc_tl',"
                        +"    'username' = 'root',                       "
                        +"    'password' = '',                           "
                        +"    'sink.label-prefix' = 'doris_tl"+System.currentTimeMillis()+"')"
        );

        // 执行一个insert语句
        tableEnv.executeSql(
                " INSERT INTO tfc_tl_dorissink                                    "
                +" SELECT                                                                  "
                +" TO_DATE(date_format(to_timestamp_ltz(event_time,3),'yyyy-MM-dd')) AS dt "
                +" ,user_id                                                                "
                +" ,device_type                                                            "
                +" ,release_channel                                                        "
                +" ,session_id                                                             "
                +" ,session_cut_id                                                         "
                +" ,page_type                                                              "
                +" ,page_url                                                               "
                +" ,page_start_time                                                        "
                +" ,page_end_time                                                          "
                +" FROM tmp_view                                                           ");

        env.execute();

    }




}
